# Load Python libraries
import numpy as np
import math
from collections import Counter
from scipy.stats import entropy
# Load Plot libraries
import seaborn as sns
import matplotlib.pyplot as plt
# Class HuffmanCode from scratch
class HuffmanCode:
# Return a Huffman code for an ensemble with distribution p
def get_code(self, p_symbols):
# Init validation
n = len(p_symbols)
if n == 0:
return dict()
elif n == 1:
return dict(zip(p_symbols.keys(), ['1']))
# Ensure probabilities sum to 1
self._normalize_weights(p_symbols)
# Returns Huffman code
return self._get_code(p_symbols);
# (Private) Calculate Huffman code
def _get_code(self, p):
# Base case of only two symbols, assign 0 or 1 arbitrarily
if len(p) == 2:
return dict(zip(p.keys(), ['0', '1']))
# Create a new distribution by merging lowest prob pair
p_prime = p.copy()
s1, s2 = self._get_lowest_prob_pair(p)
p1, p2 = p_prime.pop(s1), p_prime.pop(s2)
p_prime[s1 + s2] = p1 + p2
# Recurse and construct code on new distribution
code = self._get_code(p_prime)
symbol = s1 + s2
s1s2 = code.pop(symbol)
code[s1], code[s2] = s1s2 + '0', s1s2 + '1'
return code;
# Return pair of symbols from distribution p with lowest probabilities
def _get_lowest_prob_pair(self, p):
# Ensure there are at least 2 symbols in the dist.
if len(p) >= 2:
sorted_p = sorted(p.items(), key=lambda x: x[1])
return sorted_p[0][0], sorted_p[1][0];
return (None, None);
# Makes sure all weights add up to 1
def _normalize_weights(self, p_symbols, t_weight=1.0):
n = sum(p_symbols.values())
if n != t_weight:
for s in p_symbols:
p_symbols[s] = p_symbols[s] / n;
# Create Huffman Code instance
hc = HuffmanCode()
# Function - Read file in low level (Bytes)
def get_file_bytes(file_path):
with open(file_path, 'rb') as f:
return bytearray(f.read());
return None;
# Function - Return shannon entropy
def entropy_shannon(labels, base=None):
value, counts = np.unique(labels, return_counts=True)
return entropy(counts, base=base)
# Loading target file
file_path = "../data/text/book-1.txt"
text_byte_list = get_file_bytes(file_path)
print(len(text_byte_list), 'Bytes')
# Function - Calculates the compression percentage (%)
def calc_compression_percentage(curr_size, new_size):
return round((curr_size - new_size) / curr_size * 100, 2);
# Calculate entropy of file
curr_entropy = entropy_shannon(text_byte_list, 2)
curr_entropy
# Real compression percentage (%)
curr_size = 8
new_size = curr_entropy
compress_rate = calc_compression_percentage(curr_size, new_size)
print(compress_rate, '%')
# Function - Calculate code frequency
def get_term_freq(term_list):
term_freq = {}
terms_count = dict(Counter(term_list))
for key, value in terms_count.items():
if isinstance(key, int):
key = chr(key)
term_freq[key] = value
return term_freq;
# Function - Build the compress file
def create_compress_file(byte_list, code_list):
compress_list = []
for symbol in byte_list:
key = chr(symbol)
new_symbol = code_list[key]
compress_list.append(new_symbol)
# Return compress file
return "".join(compress_list)
# Function - Compressing file
def get_compress_file(byte_list):
# Get symbols frequency
term_freq = get_term_freq(byte_list)
# Normalize term frequency
n = sum(term_freq.values())
for term in term_freq:
term_freq[term] = term_freq[term] / n;
# Get Huffman coding
h_code = hc.get_code(term_freq)
# Compressing file with Huffman code
compress_file = create_compress_file(byte_list, h_code)
return compress_file, h_code;
# Compressing initial text file
compress_file, h_code = get_compress_file(text_byte_list)
# Real compression percentage (%)
curr_size = len(text_byte_list)
new_size = len(compress_file) / 8
compress_rate = calc_compression_percentage(curr_size, new_size)
print(compress_rate, '%')
Below, the functions that allow you to find the best entropy configuration for a binary file.
# Function - Find the low entropy setup for target file
def find_low_entropy(byte_list, neighborhood, verbose):
B = 256
best_entropy = math.log(B, 2)
best_byte_list = []
best_key = -1
n = len(byte_list)
if verbose:
print(n, 'Bytes')
for curr_key in range(B):
curr_byte_list = []
for i in range(n):
curr_value = curr_key ^ byte_list[i]
for j in range(i - neighborhood, i + 1):
if i != j and j >= 0:
curr_value = curr_value ^ curr_byte_list[j]
curr_byte_list.append(curr_value)
# Get current entropy
curr_entropy = entropy_shannon(curr_byte_list.copy(), 2)
# Update best option
if curr_entropy < best_entropy:
if verbose:
print('key:', curr_key, ', last entropy:', best_entropy, ', new entropy:', curr_entropy)
best_entropy = curr_entropy
best_key = curr_key
best_byte_list = curr_byte_list
return { 'byte_list': best_byte_list, 'entropy': best_entropy, 'key': best_key};
# Find file with low entropy
best_option = find_low_entropy(text_byte_list, 0, True)
# Show best entropy setup
best_option['entropy'], best_option['key']
# Save best byte list
best_byte_list = best_option['byte_list']
# Function - Create byte matrix
def create_byte_matrix(byte_list, row_len, col_len):
matrix = np.zeros((row_len, col_len))
data = np.array(byte_list)
# Save bytes into matrix
for i in range(0, len(data)):
ix_row = int(i / col_len)
ix_col = i % col_len
matrix[ix_row][ix_col] = data[i]
return matrix;
# Function - Plot image in binary file
def plot_byte_matrix(matrix, plt_title):
fig, ax = plt.subplots(figsize = (14, 14))
sns.heatmap(matrix, ax = ax)
ax.set_title(plt_title, fontsize = 16)
ax.set_xlabel('columns', fontsize = 12)
ax.set_ylabel('rows', fontsize = 12)
plt.show()
# Create and plot original matrix
row_len = 1300
col_len = 967
ori_matrix = create_byte_matrix(text_byte_list, row_len, col_len)
plot_byte_matrix(ori_matrix, 'Matrix of Original File')
# Create and plot the new matrix (with low entropy)
row_len = 1300
col_len = 967
ori_matrix = create_byte_matrix(best_byte_list, row_len, col_len)
plot_byte_matrix(ori_matrix, 'Matrix of New File')
# Save the new file as txt
temp_file_path = file_path.replace('.txt', '-new.txt')
with open(temp_file_path, 'w+b') as f:
binary_format = bytearray(best_byte_list)
f.write(binary_format)
Below, the functions that allow you to reverse the entropy change.
# Restoring entropy process
new_byte_list = []
neighborhood = 0
n = len(best_byte_list)
for i in range(n-1, 0, -1):
curr_value = best_option['key'] ^ best_byte_list[i]
for j in range(i - neighborhood, i + 1):
if i != j and j >= 0:
curr_value = curr_value ^ new_byte_list[j]
new_byte_list.append(curr_value)
new_byte_list.append(best_option['key'] ^ best_byte_list[0])
new_byte_list = list(reversed(new_byte_list))
# Comparing size of byte lists
len(new_byte_list) == len(text_byte_list)
# Comparing values of byte lists
sum(new_byte_list) - sum(text_byte_list)
# Comparing entropies of byte lists
entropy_shannon(text_byte_list, 2) == entropy_shannon(new_byte_list, 2)
# Create and plot the new matrix (with low entropy)
row_len = 1300
col_len = 967
ori_matrix = create_byte_matrix(new_byte_list, row_len, col_len)
plot_byte_matrix(ori_matrix, 'Matrix of Original File')
# Save the original file as txt
temp_file_path = file_path.replace('.txt', '-original.txt')
with open(temp_file_path, 'w+b') as f:
binary_format = bytearray(new_byte_list)
f.write(binary_format)